//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2020 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Atomics
import NIOConcurrencyHelpers
import NIOCore

/// A NIO `Channel` that encapsulates a single SSH `Channel`.
///
/// SSH is a multiplexed protocol. NIO reflects this reality by setting up `SSHChildChannel`s for transmitting data
/// back-and-forth. These `Channel`s have a bunch of unique powers reflected in terms of specific `ChannelOption`s
/// for SSH purposes. In some cases it is possible to treat SSH channels as naive data pipes, but often the channels
/// are intended for specific use-cases and need to be treated as special cases.
final class SSHChildChannel {
    private var _pipeline: ChannelPipeline!

    private let closePromise: EventLoopPromise<Void>

    private let multiplexer: SSHChannelMultiplexer

    private var state: ChildChannelStateMachine

    private var windowManager: ChildChannelWindowManager

    /// If close0 was called but the channel could not synchronously close (because it's currently
    /// active), the promise is stored here until it can be fulfilled.
    private var pendingClosePromise: EventLoopPromise<Void>?

    /// A buffer of pending inbound reads delivered from the parent channel.
    private var pendingReads: CircularBuffer<PendingContent> = CircularBuffer(initialCapacity: 8)

    /// Whether `autoRead` is enabled. By default, all `SSHChildChannel` objects inherit their `autoRead`
    /// state from their parent.
    private var autoRead: Bool

    /// Whether this channel supports remote half-closure. By default this is set to false.
    private var allowRemoteHalfClosure: Bool

    /// Whether a call to `read` has happened without any messages available to read (that is, whether newly
    /// received messages should be immediately delivered to the pipeline).
    private var unsatisfiedRead: Bool = false

    /// Whether we had written an automatic (i.e. generated by this channel) message.
    private var didWriteAutomaticMessage: Bool

    /// A buffer of pending outbound writes from the user.
    ///
    /// To correctly respect flushes, we deliberately withold data from the parent channel until this
    /// stream is flushed, at which time we deliver them all. This buffer holds the pending ones.
    private var pendingWritesFromChannel: MarkedCircularBuffer<(PendingContent, EventLoopPromise<Void>?)> = MarkedCircularBuffer(initialCapacity: 8)

    /// A buffer of pending outbound messages for the parent channel.
    ///
    /// This buffer exists to avoid message re-ordering issues when we make outcalls. Some messages
    /// trigger multiple outcalls, any of which could interrupt message delivery or event ordering.
    /// To avoid difficulty here, we make sure we enqueue the writes for the multiplexer here.
    private var pendingWritesForMultiplexer: CircularBuffer<(SSHMessage, EventLoopPromise<Void>?)> = CircularBuffer(initialCapacity: 8)

    /// When we're activating, outbound operations cannot be processed. We buffer them here temporarily and re-process them
    /// once we've activated. After activation this list should be empty.
    private var bufferedOutboundOperations: CircularBuffer<OutboundControlOperation> = CircularBuffer(initialCapacity: 8)

    /// An object that controls whether this channel should be writable.
    private var writabilityManager: ChildChannelWritabilityManager

    /// The max message size set by the peer. We initialize it to zero to begin with.
    private var peerMaxMessageSize: UInt32

    private var activationState: ActivationState

    // MARK: Stored properties for channel/channelcore conformance.

    public let allocator: ByteBufferAllocator

    public let parent: Channel?

    public let eventLoop: EventLoop

    private let _isWritable: ManagedAtomic<Bool>

    private let _isActive: ManagedAtomic<Bool>

    typealias Initializer = (Channel, SSHChannelType) -> EventLoopFuture<Void>

    private var initializer: Initializer?

    private var didClose = false

    private var type: SSHChannelType?

    /// A promise from the user that will be fired when the channel goes active.
    private var userActivatePromise: EventLoopPromise<Channel>?

    internal convenience init(allocator: ByteBufferAllocator,
                              parent: Channel,
                              multiplexer: SSHChannelMultiplexer,
                              initializer: Initializer?,
                              localChannelID: UInt32,
                              targetWindowSize: Int32,
                              initialOutboundWindowSize: UInt32) {
        self.init(allocator: allocator,
                  parent: parent,
                  multiplexer: multiplexer,
                  initializer: initializer,
                  initialState: .init(localChannelID: localChannelID),
                  targetWindowSize: targetWindowSize,
                  initialOutboundWindowSize: initialOutboundWindowSize)
    }

    private init(allocator: ByteBufferAllocator,
                 parent: Channel,
                 multiplexer: SSHChannelMultiplexer,
                 initializer: Initializer?,
                 initialState: ChildChannelStateMachine,
                 targetWindowSize: Int32,
                 initialOutboundWindowSize: UInt32) {
        self.allocator = allocator
        self.closePromise = parent.eventLoop.makePromise()
        self.parent = parent
        self.eventLoop = parent.eventLoop
        self.multiplexer = multiplexer
        self.initializer = initializer
        self.windowManager = ChildChannelWindowManager(targetWindowSize: UInt32(targetWindowSize))
        self._isWritable = .init(true)
        self._isActive = .init(false)
        self.state = initialState
        self.writabilityManager = ChildChannelWritabilityManager(initialWindowSize: initialOutboundWindowSize,
                                                                 parentIsWritable: parent.isWritable)
        self.peerMaxMessageSize = 0

        // To begin with we initialize autoRead and halfClosure to false, but we are going to fetch it from our parent before we
        // go much further.
        self.autoRead = false
        self.allowRemoteHalfClosure = false
        self.didWriteAutomaticMessage = false
        self.activationState = .neverActivated
        self._pipeline = ChannelPipeline(channel: self)
    }
}

// `SSHChildChannel` protects its shared state by dispatching to its associated `eventLoop`
extension SSHChildChannel: @unchecked Sendable {}

extension SSHChildChannel: Channel, ChannelCore {
    public var closeFuture: EventLoopFuture<Void> {
        self.closePromise.futureResult
    }

    public var pipeline: ChannelPipeline {
        self._pipeline
    }

    public var localAddress: SocketAddress? {
        self.parent.flatMap { $0.localAddress }
    }

    public var remoteAddress: SocketAddress? {
        self.parent.flatMap { $0.remoteAddress }
    }

    func localAddress0() throws -> SocketAddress {
        try self.parent!._channelCore.localAddress0()
    }

    func remoteAddress0() throws -> SocketAddress {
        try self.parent!._channelCore.localAddress0()
    }

    func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) -> EventLoopFuture<Void> {
        if self.eventLoop.inEventLoop {
            do {
                return self.eventLoop.makeSucceededFuture(try self.setOption0(option, value: value))
            } catch {
                return self.eventLoop.makeFailedFuture(error)
            }
        } else {
            return self.eventLoop.submit { try self.setOption0(option, value: value) }
        }
    }

    public func getOption<Option: ChannelOption>(_ option: Option) -> EventLoopFuture<Option.Value> {
        if self.eventLoop.inEventLoop {
            do {
                return self.eventLoop.makeSucceededFuture(try self.getOption0(option))
            } catch {
                return self.eventLoop.makeFailedFuture(error)
            }
        } else {
            return self.eventLoop.submit { try self.getOption0(option) }
        }
    }

    private func setOption0<Option: ChannelOption>(_ option: Option, value: Option.Value) throws {
        self.eventLoop.preconditionInEventLoop()

        switch option {
        case _ as ChannelOptions.Types.AutoReadOption:
            self.autoRead = value as! Bool
        case _ as ChannelOptions.Types.AllowRemoteHalfClosureOption:
            self.allowRemoteHalfClosure = value as! Bool
        default:
            fatalError("setting option \(option) on SSHChildChannel not supported")
        }
    }

    private func getOption0<Option: ChannelOption>(_ option: Option) throws -> Option.Value {
        self.eventLoop.preconditionInEventLoop()

        switch option {
        case _ as SSHChildChannelOptions.Types.LocalChannelIdentifierOption:
            return self.state.localChannelIdentifier as! Option.Value
        case _ as SSHChildChannelOptions.Types.RemoteChannelIdentifierOption:
            return self.state.remoteChannelIdentifier as! Option.Value
        case _ as SSHChildChannelOptions.Types.SSHChannelTypeOption:
            // This force-unwrap is safe: we set type before we call the initializer, so
            // users can only get this after this value is set.
            return self.type! as! Option.Value
        case _ as SSHChildChannelOptions.Types.PeerMaximumMessageLengthOption:
            return self.peerMaxMessageSize as! Option.Value
        case _ as ChannelOptions.Types.AutoReadOption:
            return self.autoRead as! Option.Value
        case _ as ChannelOptions.Types.AllowRemoteHalfClosureOption:
            return self.allowRemoteHalfClosure as! Option.Value
        default:
            fatalError("option \(option) not supported on SSHChildChannel")
        }
    }

    public var isWritable: Bool {
        self._isWritable.load(ordering: .relaxed)
    }

    public var isActive: Bool {
        self._isActive.load(ordering: .relaxed)
    }

    public var _channelCore: ChannelCore {
        self
    }

    public func register0(promise: EventLoopPromise<Void>?) {
        fatalError("not implemented \(#function)")
    }

    public func bind0(to: SocketAddress, promise: EventLoopPromise<Void>?) {
        fatalError("not implemented \(#function)")
    }

    public func connect0(to: SocketAddress, promise: EventLoopPromise<Void>?) {
        fatalError("not implemented \(#function)")
    }

    public func write0(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
        guard !self.state.isClosed else {
            promise?.fail(ChannelError.ioOnClosedChannel)
            return
        }

        guard !self.state.sentEOF else {
            promise?.fail(ChannelError.outputClosed)
            return
        }

        let bodyData = self.unwrapData(data, as: SSHChannelData.self)
        let writeSize = bodyData.data.readableBytes

        self.pendingWritesFromChannel.append((.data(bodyData), promise))

        // Ok, we can make an outcall now, which means we can safely deal with the flow control.
        if case .changed(newValue: let value) = self.writabilityManager.bufferedBytes(writeSize) {
            self.changeWritability(to: value)
        }
    }

    public func flush0() {
        self.pendingWritesFromChannel.mark()

        if self.isActive {
            self.deliverPendingWrites()
        }

        self.writePendingToMultiplexer()
    }

    public func read0() {
        if self.unsatisfiedRead {
            // We already have an unsatisfied read, let's do nothing.
            return
        }

        // At this stage, we have an unsatisfied read. If there is no pending data to read,
        // we're going to call read() on the parent channel. Otherwise, we're going to
        // succeed the read out of our pending data.
        self.unsatisfiedRead = true
        if self.pendingReads.count > 0 {
            self.tryToRead()
        } else {
            self.parent?.read()
        }

        // We may have generated automatic messages here.
        self.writePendingToMultiplexer()
    }

    public func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) {
        // Can't process this right now.
        if self.bufferOutboundEvent(.close(error, mode, promise)) {
            return
        }

        self._actuallyClose0(error: error, mode: mode, promise: promise)
    }

    private func _actuallyClose0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) {
        // If the stream is already closed, we can fail this early and abort processing. If it's not, we need to emit a
        // SSH_MSG_CHANNEL_CLOSE.
        guard !self.state.isClosed else {
            promise?.fail(ChannelError.alreadyClosed)
            return
        }

        switch mode {
        case .input:
            // We can't close the input in SSH.
            promise?.fail(ChannelError.operationUnsupported)
            return

        case .output:
            // Closing output turns into sending a frame.
            // This message needs to be buffered with the outbound I/O. It also counts as a flush (because
            // closing is normally not something that can be flushed).
            self.pendingWritesFromChannel.append((.eof, promise))
            self.flush0()
            return

        case .all:
            // Store the pending close promise: it'll be succeeded later.
            if let promise = promise {
                if let pendingPromise = self.pendingClosePromise {
                    pendingPromise.futureResult.cascade(to: promise)
                } else {
                    self.pendingClosePromise = promise
                }
            }

            if self.state.isActiveOnNetwork {
                // Gotta do some I/O.
                self.closedWhileOpen()
            } else {
                // The channel isn't active on the network, just go straight to closedCleanly.
                self.closedCleanly()
            }
        }
    }

    public func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise<Void>?) {
        if self.bufferOutboundEvent(.userOutboundEvent(event, promise)) {
            // Buffered, nothing to do here.
            return
        }

        self._actuallyTriggerOutboundEvent0(event, promise: promise)
    }

    private func _actuallyTriggerOutboundEvent0(_ event: Any, promise: EventLoopPromise<Void>?) {
        // There is no flush for user outbound events, so they're expensive. For now we live
        // with that.
        let message: SSHMessage

        switch event {
        case let event as SSHChannelRequestEvent.ExecRequest:
            message = SSHMessage(event, recipientChannel: self.state.remoteChannelIdentifier!)
        case let event as SSHChannelRequestEvent.EnvironmentRequest:
            message = SSHMessage(event, recipientChannel: self.state.remoteChannelIdentifier!)
        case let event as SSHChannelRequestEvent.ExitStatus:
            message = SSHMessage(event, recipientChannel: self.state.remoteChannelIdentifier!)
        case let event as SSHChannelRequestEvent.PseudoTerminalRequest:
            message = SSHMessage(event, recipientChannel: self.state.remoteChannelIdentifier!)
        case let event as SSHChannelRequestEvent.ShellRequest:
            message = SSHMessage(event, recipientChannel: self.state.remoteChannelIdentifier!)
        case let event as SSHChannelRequestEvent.ExitSignal:
            message = SSHMessage(event, recipientChannel: self.state.remoteChannelIdentifier!)
        case let event as SSHChannelRequestEvent.SubsystemRequest:
            message = SSHMessage(event, recipientChannel: self.state.remoteChannelIdentifier!)
        case let event as SSHChannelRequestEvent.WindowChangeRequest:
            message = SSHMessage(event, recipientChannel: self.state.remoteChannelIdentifier!)
        case let event as SSHChannelRequestEvent.LocalFlowControlRequest:
            message = SSHMessage(event, recipientChannel: self.state.remoteChannelIdentifier!)
        case let event as SSHChannelRequestEvent.SignalRequest:
            message = SSHMessage(event, recipientChannel: self.state.remoteChannelIdentifier!)
        case is ChannelSuccessEvent:
            message = .channelSuccess(.init(recipientChannel: self.state.remoteChannelIdentifier!))
        case is ChannelFailureEvent:
            message = .channelFailure(.init(recipientChannel: self.state.remoteChannelIdentifier!))
        default:
            promise?.fail(ChannelError.operationUnsupported)
            return
        }

        self.processOutboundMessage(message, promise: promise)
        self.writePendingToMultiplexer()
    }

    public func channelRead0(_: NIOAny) {
        // do nothing
    }

    public func errorCaught0(error: Error) {
        // do nothing
    }

    internal func configure(userPromise: EventLoopPromise<Channel>?, channelType: SSHChannelType) {
        self.type = channelType

        // We need to configure this channel. This involves doing four things:
        // 1. Setting our autoRead state from the parent
        // 2. Calling the initializer, if provided.
        // 3. Activating when complete.
        // 4. Catching errors if they occur.
        let f = self.parent!.getOption(ChannelOptions.autoRead).flatMap { autoRead -> EventLoopFuture<Void> in
            self.autoRead = autoRead
            let initializer = self.initializer
            self.initializer = nil
            return initializer?(self, channelType) ?? self.eventLoop.makeSucceededFuture(())
        }.map {
            self.initializerComplete(userPromise: userPromise)
        }

        f.whenFailure { (error: Error) in
            self.initializerFailed(error: error)
            userPromise?.fail(error)
        }
    }

    /// Activates this channel.
    fileprivate func performActivation() {
        // We validate that the parent is active, and that we're active on the network, and that we haven't
        // activated already before we proceed. This makes this method idempotent.
        guard let parent = self.parent, parent.isActive, self.state.isActiveOnNetwork, case .neverActivated = self.activationState else {
            return
        }

        self.notifyChannelActive()
        if !self.writabilityManager.isWritable {
            self.changeWritability(to: false)
        }
        self.unbufferOutboundEvents()
        self.tryToAutoRead()
        self.deliverPendingWrites()
        self.writePendingToMultiplexer()
        if let promise = self.userActivatePromise {
            self.userActivatePromise = nil
            promise.succeed(self)
        }
    }

    private func initializerComplete(userPromise: EventLoopPromise<Channel>?) {
        // We need to work out what we need to do once the initializer completes.
        self.userActivatePromise = userPromise

        if self.state.isActiveOnNetwork {
            // We need to send a channelOpenSuccess.
            let message = SSHMessage.ChannelOpenConfirmationMessage(recipientChannel: self.state.remoteChannelIdentifier!,
                                                                    senderChannel: self.state.localChannelIdentifier,
                                                                    initialWindowSize: self.windowManager.targetWindowSize,
                                                                    maximumPacketSize: 1 << 24) // This is a weirdly hard-coded choice.
            self.processOutboundMessage(.channelOpenConfirmation(message), promise: nil)
            self.writePendingToMultiplexer()
        } else if !self.state.isClosed {
            // We need to request the channel. We must have the channel by now.
            let message = SSHMessage.ChannelOpenMessage(type: .init(self.type!),
                                                        senderChannel: self.state.localChannelIdentifier,
                                                        initialWindowSize: self.windowManager.targetWindowSize,
                                                        maximumPacketSize: 1 << 24)
            self.processOutboundMessage(.channelOpen(message), promise: nil)
            self.writePendingToMultiplexer()
        } else {
            // Already closed, just fail this promise.
            self.closeFuture.cascadeFailure(to: userPromise)
        }
    }

    private func initializerFailed(error: Error) {
        // Tell the remote peer to go away.
        if self.state.isActiveOnNetwork {
            let message = SSHMessage.ChannelOpenFailureMessage(recipientChannel: self.state.remoteChannelIdentifier!,
                                                               reasonCode: 2,
                                                               description: "",
                                                               language: "en-US")
            self.processOutboundMessage(.channelOpenFailure(message), promise: nil)
            self.writePendingToMultiplexer()
        } else {
            self.errorEncountered(error: error)
        }
    }

    /// Called when the channel was closed from the pipeline while the stream is still open.
    ///
    /// Will emit a `SSH_MSG_CHANNEL_CLOSE` to close the channel.
    private func closedWhileOpen() {
        precondition(!self.state.isClosed)

        if self.state.sentClose {
            // Already sent a close, do nothing else.
            return
        }

        let message = SSHMessage.ChannelCloseMessage(recipientChannel: self.state.remoteChannelIdentifier!)
        self.processOutboundMessage(.channelClose(message), promise: nil)
        self.writePendingToMultiplexer()
    }

    private func closedCleanly() {
        precondition(!self.state.isActiveOnNetwork)

        // We use didClose as a gating mechanism: only one of closedCleanly
        // or errorEncountered ever gets to run. This is important, as errorEncountered
        // can actually trigger closedCleanly.
        if self.didClose {
            // We've already closed
            return
        }
        self.didClose = true

        self.deliverPendingReads()
        self.failPendingWrites(error: ChannelError.eof)
        self.failPendingOutboundEvents(error: ChannelError.eof)
        if let promise = self.pendingClosePromise {
            self.pendingClosePromise = nil
            promise.succeed(())
        }
        if let promise = self.userActivatePromise {
            // NOTE(cory): I don't think we can actually hit this? It's a useful fallback though: we have it in
            // case a programming error occurs, because it's semantically right and avoids a crash.
            self.userActivatePromise = nil
            promise.fail(ChannelError.eof)
        }

        self.notifyChannelInactive()

        self.eventLoop.execute {
            self.removeHandlers(pipeline: self.pipeline)
            self.closePromise.succeed(())
            self.multiplexer.childChannelClosed(channelID: self.state.localChannelIdentifier)
        }
    }

    fileprivate func errorEncountered(error: Error) {
        // We use didClose as a gating mechanism: only one of closedCleanly
        // or errorEncountered ever gets to run. This is important, as errorEncountered
        // can actually trigger closedCleanly.
        if self.didClose {
            return
        }
        self.didClose = true

        self.deliverPendingReads()
        self.failPendingWrites(error: error)
        self.failPendingOutboundEvents(error: error) // These all go away, but we're going to use our own!
        if let promise = self.pendingClosePromise {
            self.pendingClosePromise = nil
            promise.fail(error)
        }
        if let promise = self.userActivatePromise {
            self.userActivatePromise = nil
            promise.fail(error)
        }
        self.pipeline.fireErrorCaught(error)
        self.notifyChannelInactive()

        // Ok, we need to notify the network that we're done.
        if self.state.isActiveOnNetwork, !self.state.sentClose {
            let message = SSHMessage.ChannelCloseMessage(recipientChannel: self.state.remoteChannelIdentifier!)
            self.processOutboundMessage(.channelClose(message), promise: nil)
            self.writePendingToMultiplexer()
        }

        self.eventLoop.execute {
            self.removeHandlers(pipeline: self.pipeline)
            self.closePromise.fail(error)
            self.multiplexer.childChannelErrored(channelID: self.state.localChannelIdentifier, expectClose: !self.state.isClosed)
        }
    }

    private func tryToRead() {
        // If there's no read to satisfy, no worries about it.
        guard self.unsatisfiedRead else {
            return
        }

        // If we're not active, we will hold on to these reads.
        guard self.state.isActiveOnChannel else {
            return
        }

        // If there are no pending reads, do nothing.
        guard self.pendingReads.count > 0 else {
            return
        }

        // Ok, we're satisfying a read here.
        self.unsatisfiedRead = false
        self.deliverPendingReads()
        self.tryToAutoRead()
    }

    private func changeWritability(to newWritability: Bool) {
        self._isWritable.store(newWritability, ordering: .relaxed)
        self.pipeline.fireChannelWritabilityChanged()
    }

    private func tryToAutoRead() {
        if self.autoRead {
            // If auto-read is turned on, recurse into channelPipeline.read().
            // This cannot recurse indefinitely unless frames are being delivered
            // by the read stacks, which is generally fairly unlikely to continue unbounded.
            self.pipeline.read()
        }
    }
}

// MARK: - Functions used to manage pending reads and writes.

private extension SSHChildChannel {
    /// Deliver reads to the channel.
    ///
    /// This is sometimes done when the channel itself is closed, because data loss in these circumstances is unacceptable.
    private func deliverPendingReads() {
        while self.pendingReads.count > 0 {
            self.deliverSingleRead(self.pendingReads.removeFirst())
        }
        self.pipeline.fireChannelReadComplete()
    }

    private func deliverSingleRead(_ data: PendingContent) {
        switch data {
        case .data(let data):
            // We only futz with the window manager if the channel is not already closed.
            if !self.didClose, !self.state.sentClose, let increment = self.windowManager.unbufferBytes(data.data.readableBytes) {
                let update = SSHMessage.ChannelWindowAdjustMessage(recipientChannel: self.state.remoteChannelIdentifier!, bytesToAdd: UInt32(increment))
                self.processOutboundMessage(.channelWindowAdjust(update), promise: nil)
            }
            self.pipeline.fireChannelRead(NIOAny(data))

        case .eof:
            self.pipeline.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
        }
    }

    /// Delivers all pending flushed writes to the parent channel.
    private func deliverPendingWrites() {
        while self.pendingWritesFromChannel.hasMark, self.writabilityManager.windowSpaceOnNetwork > 0, var write = self.pendingWritesFromChannel.first {
            let maxWriteLength = min(self.writabilityManager.windowSpaceOnNetwork, Int(self.peerMaxMessageSize))
            let (actualWrite, excess) = write.0.trim(maxLength: maxWriteLength)
            write.0 = actualWrite

            if let excess = excess {
                // We need to push the excess back on to the queue.
                self.pendingWritesFromChannel[self.pendingWritesFromChannel.startIndex] = (excess, write.1)
                write.1 = nil
            } else {
                // Drop the first write.
                _ = self.pendingWritesFromChannel.popFirst()
            }

            let writeSize = write.0.readableBytes

            if case .changed(newValue: let value) = self.writabilityManager.wroteBytes(writeSize) {
                self.changeWritability(to: value)
            }

            self.processOutboundMessage(write.0, promise: write.1)
        }
    }

    /// Fails all pending writes with the given error.
    private func failPendingWrites(error: Error) {
        while self.pendingWritesFromChannel.count > 0 {
            self.pendingWritesFromChannel.removeFirst().1?.fail(error)
        }
    }
}

// MARK: - Inbound message handling.

extension SSHChildChannel {
    /// Called when a frame is received from the network.
    ///
    /// - parameters:
    ///     - frame: The `SSHMessage` received from the network.
    func receiveInboundMessage(_ message: SSHMessage) {
        do {
            switch message {
            case .channelOpen(let message):
                try self.handleInboundChannelOpen(message)
            case .channelOpenConfirmation(let message):
                try self.handleInboundChannelOpenConfirmation(message)
            case .channelOpenFailure(let message):
                try self.handleInboundChannelOpenFailure(message)
            case .channelEOF(let message):
                try self.handleInboundChannelEOF(message)
            case .channelClose(let message):
                try self.handleInboundChannelClose(message)
            case .channelWindowAdjust(let message):
                try self.handleInboundChannelWindowAdjust(message)
            case .channelData(let message):
                try self.handleInboundChannelData(message)
            case .channelExtendedData(let message):
                try self.handleInboundChannelExtendedData(message)
            case .channelRequest(let message):
                try self.handleInboundChannelRequest(message)
            case .channelSuccess(let message):
                try self.handleInboundChannelSuccess(message)
            case .channelFailure(let message):
                try self.handleInboundChannelFailure(message)
            default:
                preconditionFailure("Channels only handle channel messages")
            }
        } catch {
            self.errorEncountered(error: error)
        }
    }

    private func handleInboundChannelOpen(_ message: SSHMessage.ChannelOpenMessage) throws {
        self.state.receiveChannelOpen(message)

        // Window size starts at zero, so we treat this as an increment. However, we disregard whether this changed
        // the writability value, as we lie about writability until we're active anyway.
        _ = try self.writabilityManager.outboundWindowIncremented(message.initialWindowSize)
        self.peerMaxMessageSize = message.maximumPacketSize

        // If we got through the state machine, this is a channel opening maneuver. We can
        // now kick off the initializer. Inbound channels never have user promises.
        self.configure(userPromise: nil, channelType: SSHChannelType(message))
    }

    private func handleInboundChannelOpenConfirmation(_ message: SSHMessage.ChannelOpenConfirmationMessage) throws {
        try self.state.receiveChannelOpenConfirmation(message)

        // Window size starts at zero, so we treat this as an increment. However, we disregard whether this changed
        // the writability value, as we lie about writability until we're active anyway.
        _ = try self.writabilityManager.outboundWindowIncremented(message.initialWindowSize)
        self.peerMaxMessageSize = message.maximumPacketSize

        self.performActivation()
    }

    private func handleInboundChannelOpenFailure(_ message: SSHMessage.ChannelOpenFailureMessage) throws {
        try self.state.receiveChannelOpenFailure(message)
        self.errorEncountered(error: NIOSSHError.channelSetupRejected(reasonCode: message.reasonCode, reason: message.description))
    }

    private func handleInboundChannelEOF(_ message: SSHMessage.ChannelEOFMessage) throws {
        try self.state.receiveChannelEOF(message)

        // If we don't support half-closure, this is a full close.
        // Otherwise, it's a half-closure.
        if self.allowRemoteHalfClosure {
            // Hey, remote half-closure is allowed! That's handy! We queue this with the reads to avoid it being re-ordered.
            self.pendingReads.append(.eof)
        } else {
            // We don't support remote half-closure. That puts us in a bit of a bind. We have to promote this up to full-closure.
            // We need to send a channel close, so let's just do that: the outbound state machine will make this a full-closure.
            let closeMessage = SSHMessage.channelClose(.init(recipientChannel: self.state.remoteChannelIdentifier!))
            self.processOutboundMessage(closeMessage, promise: nil)
        }
    }

    private func handleInboundChannelClose(_ message: SSHMessage.ChannelCloseMessage) throws {
        try self.state.receiveChannelClose(message)

        // If we didn't throw, this must be acceptable to process.
        if self.state.isClosed {
            self.closedCleanly()
        } else {
            // We need to issue a close immediately.
            let closeMessage = SSHMessage.channelClose(.init(recipientChannel: self.state.remoteChannelIdentifier!))
            self.processOutboundMessage(closeMessage, promise: nil)
        }
    }

    private func handleInboundChannelWindowAdjust(_ message: SSHMessage.ChannelWindowAdjustMessage) throws {
        try self.state.receiveChannelWindowAdjust(message)
        if case .changed(let newValue) = try self.writabilityManager.outboundWindowIncremented(message.bytesToAdd) {
            self.changeWritability(to: newValue)
        }

        // There may be writes to deliver, deliver them.
        self.deliverPendingWrites()
    }

    private func handleInboundChannelData(_ message: SSHMessage.ChannelDataMessage) throws {
        // Validate this data in the state machine.
        try self.state.receiveChannelData(message)

        // State machine is happy. Handle the flow control.
        try self.windowManager.bufferFlowControlledBytes(message.data.readableBytes)
        self.pendingReads.append(.data(.init(message)))
    }

    private func handleInboundChannelExtendedData(_ message: SSHMessage.ChannelExtendedDataMessage) throws {
        try self.state.receiveChannelExtendedData(message)

        // State machine is happy. Handle the flow control.
        try self.windowManager.bufferFlowControlledBytes(message.data.readableBytes)
        self.pendingReads.append(.data(.init(message)))
    }

    private func handleInboundChannelRequest(_ message: SSHMessage.ChannelRequestMessage) throws {
        try self.state.receiveChannelRequest(message)

        if let userEvent = SSHChannelRequestEvent.fromMessage(message) {
            self.pipeline.fireUserInboundEventTriggered(userEvent)
        } else if message.wantReply {
            // Messages that we don't understand and that want a reply must be replied to with a failure.
            // We can force-unwrap here because we must have a remote channel identifier to have received a request.
            let replyMessage = SSHMessage.channelFailure(.init(recipientChannel: self.state.remoteChannelIdentifier!))
            self.processOutboundMessage(replyMessage, promise: nil)
        }
    }

    private func handleInboundChannelSuccess(_ message: SSHMessage.ChannelSuccessMessage) throws {
        try self.state.receiveChannelSuccess(message)
        self.pipeline.fireUserInboundEventTriggered(ChannelSuccessEvent())
    }

    private func handleInboundChannelFailure(_ message: SSHMessage.ChannelFailureMessage) throws {
        try self.state.receiveChannelFailure(message)
        self.pipeline.fireUserInboundEventTriggered(ChannelFailureEvent())
    }
}

// MARK: Outbound message handling.

extension SSHChildChannel {
    /// Called to send a message to the network.
    ///
    /// For flow controlled messages, this may only be invoked once flow control has been
    /// satisfied.
    ///
    /// - parameters:
    ///     - frame: The `SSHMessage` to send to the network.
    ///     - promise: The promise associated with the message write, if any.
    private func processOutboundMessage(_ message: SSHMessage, promise: EventLoopPromise<Void>?) {
        do {
            switch message {
            case .channelOpen(let message):
                try self.handleOutboundChannelOpen(message, promise)
            case .channelOpenConfirmation(let message):
                try self.handleOutboundChannelOpenConfirmation(message, promise)
            case .channelOpenFailure(let message):
                try self.handleOutboundChannelOpenFailure(message, promise)
            case .channelEOF(let message):
                try self.handleOutboundChannelEOF(message, promise)
            case .channelClose(let message):
                try self.handleOutboundChannelClose(message, promise)
            case .channelWindowAdjust(let message):
                try self.handleOutboundChannelWindowAdjust(message, promise)
            case .channelData(let message):
                try self.handleOutboundChannelData(message, promise)
            case .channelExtendedData(let message):
                try self.handleOutboundChannelExtendedData(message, promise)
            case .channelRequest(let message):
                try self.handleOutboundChannelRequest(message, promise)
            case .channelSuccess(let message):
                try self.handleOutboundChannelSuccess(message, promise)
            case .channelFailure(let message):
                try self.handleOutboundChannelFailure(message, promise)
            default:
                preconditionFailure("Channels only handle channel messages")
            }
        } catch {
            // Oof, state machine didn't like this much.
            promise?.fail(error)
            self.errorEncountered(error: error)
            return
        }
    }

    /// A helper function for transforming `SSHChannelData` into `SSHMessage`s before processing.
    private func processOutboundMessage(_ content: PendingContent, promise: EventLoopPromise<Void>?) {
        let recipientChannel = self.state.remoteChannelIdentifier!

        switch content {
        case .data(let message):
            self.processOutboundMessage(.init(message, recipientChannel: recipientChannel), promise: promise)
        case .eof:
            self.processOutboundMessage(.channelEOF(.init(recipientChannel: recipientChannel)), promise: promise)
        }
    }

    private func handleOutboundChannelOpen(_ message: SSHMessage.ChannelOpenMessage, _ promise: EventLoopPromise<Void>?) throws {
        self.state.sendChannelOpen(message)
        self.pendingWritesForMultiplexer.append((.channelOpen(message), promise))
    }

    private func handleOutboundChannelOpenConfirmation(_ message: SSHMessage.ChannelOpenConfirmationMessage, _ promise: EventLoopPromise<Void>?) throws {
        self.state.sendChannelOpenConfirmation(message)
        self.pendingWritesForMultiplexer.append((.channelOpenConfirmation(message), promise))
        self.performActivation()
    }

    private func handleOutboundChannelOpenFailure(_ message: SSHMessage.ChannelOpenFailureMessage, _ promise: EventLoopPromise<Void>?) throws {
        self.state.sendChannelOpenFailure(message)
        self.pendingWritesForMultiplexer.append((.channelOpenFailure(message), promise))
        self.errorEncountered(error: NIOSSHError.channelSetupRejected(reasonCode: message.reasonCode, reason: message.description))
    }

    private func handleOutboundChannelEOF(_ message: SSHMessage.ChannelEOFMessage, _ promise: EventLoopPromise<Void>?) throws {
        try self.state.sendChannelEOF(message)
        self.pendingWritesForMultiplexer.append((.channelEOF(message), promise))

        self.failPendingWrites(error: ChannelError.eof)
        self.pipeline.fireUserInboundEventTriggered(ChannelEvent.outputClosed)
    }

    private func handleOutboundChannelClose(_ message: SSHMessage.ChannelCloseMessage, _ promise: EventLoopPromise<Void>?) throws {
        try self.state.sendChannelClose(message)
        self.pendingWritesForMultiplexer.append((.channelClose(message), promise))

        // If we didn't throw, this must be acceptable to process.
        if self.state.isClosed {
            self.closedCleanly()
        }
    }

    private func handleOutboundChannelWindowAdjust(_ message: SSHMessage.ChannelWindowAdjustMessage, _ promise: EventLoopPromise<Void>?) throws {
        try self.state.sendChannelWindowAdjust(message)
        self.pendingWritesForMultiplexer.append((.channelWindowAdjust(message), promise))
    }

    private func handleOutboundChannelData(_ message: SSHMessage.ChannelDataMessage, _ promise: EventLoopPromise<Void>?) throws {
        // Validate this data in the state machine.
        try self.state.sendChannelData(message)
        self.pendingWritesForMultiplexer.append((.channelData(message), promise))
    }

    private func handleOutboundChannelExtendedData(_ message: SSHMessage.ChannelExtendedDataMessage, _ promise: EventLoopPromise<Void>?) throws {
        try self.state.sendChannelExtendedData(message)
        self.pendingWritesForMultiplexer.append((.channelExtendedData(message), promise))
    }

    private func handleOutboundChannelRequest(_ message: SSHMessage.ChannelRequestMessage, _ promise: EventLoopPromise<Void>?) throws {
        try self.state.sendChannelRequest(message)
        self.pendingWritesForMultiplexer.append((.channelRequest(message), promise))
    }

    private func handleOutboundChannelSuccess(_ message: SSHMessage.ChannelSuccessMessage, _ promise: EventLoopPromise<Void>?) throws {
        try self.state.sendChannelSuccess(message)
        self.pendingWritesForMultiplexer.append((.channelSuccess(message), promise))
    }

    private func handleOutboundChannelFailure(_ message: SSHMessage.ChannelFailureMessage, _ promise: EventLoopPromise<Void>?) throws {
        try self.state.sendChannelFailure(message)
        self.pendingWritesForMultiplexer.append((.channelFailure(message), promise))
    }

    /// Write any pending messages to the multiplexer. By definition all pending writes are flushed.
    private func writePendingToMultiplexer() {
        var didWrite = false
        while let (write, promise) = self.pendingWritesForMultiplexer.popFirst() {
            didWrite = true
            self.multiplexer.writeFromChannel(write, promise)
        }

        if didWrite {
            self.multiplexer.childChannelFlush()
        }
    }
}

// MARK: General messages from the multiplexer

extension SSHChildChannel {
    func receiveParentChannelReadComplete() {
        self.tryToRead()

        /// All pending writes for the multiplexer are inherently flushed. Get rid of them
        /// here in case we auto-generated any.
        self.writePendingToMultiplexer()
    }

    func parentChannelWritabilityChanged(newValue: Bool) {
        guard case .changed(newValue: let localValue) = self.writabilityManager.parentWritabilityChanged(newValue) else {
            return
        }

        // Ok, the writability changed.
        if self.state.isActiveOnChannel {
            self.changeWritability(to: localValue)
        }
    }

    func parentChannelInactive() {
        // The parent has gone inactive. If we're already closed, do nothing. Otherwise, this is an error that forces us directly into
        // closed.
        if !self.state.isClosed {
            self.state.receiveTCPEOF()
            self.errorEncountered(error: NIOSSHError.tcpShutdown)
        }
    }
}

// MARK: Activation state management

extension SSHChildChannel {
    /// An enum to keep track of whether we've notified the channel of activation or not.
    private enum ActivationState {
        case neverActivated
        case activated
        case deactivated
    }

    /// This function handles the state required to notify the channel of activity. It can safely
    /// be called repeatedly, and will only activate the channel once.
    func notifyChannelActive() {
        switch self.activationState {
        case .neverActivated:
            self.activationState = .activated
            self._isActive.store(true, ordering: .relaxed)
            self.pipeline.fireChannelActive()

        case .activated:
            assert(self._isActive.load(ordering: .relaxed) == true)

        case .deactivated:
            assert(self._isActive.load(ordering: .relaxed) == false)
        }
    }

    func notifyChannelInactive() {
        switch self.activationState {
        case .neverActivated:
            // Do nothing, transition to inactive.
            self.activationState = .deactivated
            assert(self._isActive.load(ordering: .relaxed) == false)

        case .activated:
            self.activationState = .deactivated
            self._isActive.store(false, ordering: .relaxed)
            self.pipeline.fireChannelInactive()

        case .deactivated:
            assert(self._isActive.load(ordering: .relaxed) == false)
        }
    }
}

// MARK: Outbound control event buffering

extension SSHChildChannel {
    /// A representation of the potential buffered outbound channel operations.
    ///
    /// These will be replayed after the channel becomes active.
    ///
    /// This does not apply to writes or flushes, which have their own separate pipeline.
    enum OutboundControlOperation {
        case userOutboundEvent(Any, EventLoopPromise<Void>?)
        case close(Error, CloseMode, EventLoopPromise<Void>?)
    }

    /// Try to buffer an outbound control operation. Returns `true` if the operation was buffered.
    ///
    /// We buffer outbound control operations when we're waiting for activation, or when we still have
    /// buffered outbound events (as we're presumably unbuffering them). Otherwise, we don't.
    fileprivate func bufferOutboundEvent(_ event: OutboundControlOperation) -> Bool {
        if self.state.awaitingActivation || self.bufferedOutboundOperations.count > 0 {
            self.bufferedOutboundOperations.append(event)
            return true
        } else {
            return false
        }
    }

    fileprivate func unbufferOutboundEvents() {
        precondition(!self.state.awaitingActivation)

        while let operation = self.bufferedOutboundOperations.popFirst() {
            switch operation {
            case .close(let error, let mode, let promise):
                self._actuallyClose0(error: error, mode: mode, promise: promise)
            case .userOutboundEvent(let event, let promise):
                self._actuallyTriggerOutboundEvent0(event, promise: promise)
            }
        }
    }

    fileprivate func failPendingOutboundEvents(error: Error) {
        while let operation = self.bufferedOutboundOperations.popFirst() {
            switch operation {
            case .close(_, _, let promise),
                 .userOutboundEvent(_, let promise):

                promise?.fail(error)
            }
        }
    }
}

extension SSHChildChannel {
    fileprivate enum PendingContent {
        case data(SSHChannelData)
        case eof

        var readableBytes: Int {
            switch self {
            case .data(let d):
                return d.data.readableBytes
            case .eof:
                return 0
            }
        }

        func trim(maxLength: Int) -> (trimmed: PendingContent, excess: PendingContent?) {
            guard case .data(var channelData) = self else {
                // No need to trim EOF
                return (trimmed: self, excess: nil)
            }

            // No need to trim here either.
            if channelData.data.readableBytes <= maxLength {
                return (trimmed: self, excess: nil)
            }

            // Ok, gotta trim.
            let prefix = SSHChannelData(type: channelData.type, data: channelData.data.slicePrefix(maxLength))
            return (trimmed: .data(prefix), excess: .data(channelData))
        }
    }
}

extension SSHChildChannel {
    internal struct SynchronousOptions: NIOSynchronousChannelOptions {
        private let channel: SSHChildChannel

        fileprivate init(channel: SSHChildChannel) {
            self.channel = channel
        }

        /// Set `option` to `value` on this `Channel`.
        ///
        /// - Important: Must be called on the `EventLoop` the `Channel` is running on.
        internal func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) throws {
            try self.channel.setOption0(option, value: value)
        }

        /// Get the value of `option` for this `Channel`.
        ///
        /// - Important: Must be called on the `EventLoop` the `Channel` is running on.
        internal func getOption<Option: ChannelOption>(_ option: Option) throws -> Option.Value {
            try self.channel.getOption0(option)
        }
    }

    /// Returns a view of the `Channel` exposing synchronous versions of `setOption` and `getOption`.
    public var syncOptions: NIOSynchronousChannelOptions? {
        SynchronousOptions(channel: self)
    }
}

private extension IOData {
    mutating func slicePrefix(_ length: Int) -> IOData {
        assert(length < self.readableBytes)

        switch self {
        case .byteBuffer(var buf):
            // This force-unwrap is safe, as we only invoke this when we have already checked the length.
            let newBuf = buf.readSlice(length: length)!
            self = .byteBuffer(buf)
            return .byteBuffer(newBuf)

        case .fileRegion(var region):
            let newRegion = FileRegion(fileHandle: region.fileHandle, readerIndex: region.readerIndex, endIndex: region.readerIndex + length)
            region.moveReaderIndex(forwardBy: length)
            self = .fileRegion(region)
            return .fileRegion(newRegion)
        }
    }
}
